package com.go.utils;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.util.Arrays;

/**
 * ClassName: MyKafkaDeserializationSchema
 * Description:
 * Date: 2022/1/6
 * @author: Cason
 */
public class MyKafkaDeserializationSchema implements KafkaDeserializationSchema<Tuple2<String, String>> {

    @Override
    public boolean isEndOfStream(Tuple2<String, String> nextElement) {
        return false;
    }

    /**
     * 将topic-partition-offset作为一条消息的唯一标识
     * @param record kafkaTopic中的一条消息
     * @return
     * @throws Exception
     */
    @Override
    public Tuple2<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        String kafkaTopic = record.topic();
        int kafkaPartition = record.partition();
        long kafkaOffset = record.offset();
        String uniqueKey = "" + kafkaTopic + kafkaPartition + kafkaOffset;

        System.out.println("received records");

        return Tuple2.of(uniqueKey, new String(record.value()));
    }

    @Override
    public TypeInformation<Tuple2<String, String>> getProducedType() {
        return TypeInformation.of(new TypeHint<Tuple2<String, String>>() {});
    }
}
